/*
 *  Copyright 1999-2019 Seata.io Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package io.seata.server.storage.redis.store;

import com.google.common.collect.ImmutableMap;
import io.seata.common.XID;
import io.seata.common.exception.RedisException;
import io.seata.common.exception.StoreException;
import io.seata.common.util.BeanUtils;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.GlobalStatus;
import io.seata.core.store.BranchTransactionDO;
import io.seata.core.store.GlobalTransactionDO;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionStatusValidator;
import io.seata.server.storage.SessionConverter;
import io.seata.server.storage.redis.JedisPooledFactory;
import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.TransactionStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.seata.common.ConfigurationKeys.STORE_REDIS_QUERY_LIMIT;
import static io.seata.common.DefaultValues.DEFAULT_QUERY_LIMIT;
import static io.seata.core.constants.RedisKeyConstants.*;

/**
 * The redis transaction store manager
 *
 * @author funkye
 * @author wangzhongxiang
 * @author doubleDimple
 */
public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {

	private static final Logger LOGGER = LoggerFactory.getLogger(RedisTransactionStoreManager.class);

	/** the prefix of the branch transactions */
	private static final String REDIS_SEATA_BRANCHES_PREFIX = "SEATA_BRANCHES_";

	/** the prefix of the branch transaction */
	private static final String REDIS_SEATA_BRANCH_PREFIX = "SEATA_BRANCH_";

	/** the prefix of the global transaction */
	private static final String REDIS_SEATA_GLOBAL_PREFIX = "SEATA_GLOBAL_";

	/** the prefix of the global transaction status */
	private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_";

	/** the key of global transaction status for begin */
	private static final String REDIS_SEATA_BEGIN_TRANSACTIONS_KEY = "SEATA_BEGIN_TRANSACTIONS";

	private static volatile RedisTransactionStoreManager instance;

	private static final String OK = "OK";

	/**
	 * The constant CONFIG.
	 */
	private static final Configuration CONFIG = ConfigurationFactory.getInstance();

	/**
	 * The Log query limit.
	 */
	private int logQueryLimit;

	/**
	 * Get the instance.
	 */
	public static RedisTransactionStoreManager getInstance() {
		if (instance == null) {
			synchronized (RedisTransactionStoreManager.class) {
				if (instance == null) {
					instance = new RedisTransactionStoreManager();
				}
			}
		}
		return instance;
	}

	/**
	 * init map to constructor
	 */
	public RedisTransactionStoreManager() {
		super();
		initGlobalMap();
		initBranchMap();
		logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_QUERY_LIMIT);
	}

	/**
	 * Map for LogOperation Global Operation
	 */
	public static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap;

	/**
	 * Map for LogOperation Branch Operation
	 */
	public static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap;

	/**
	 * init globalMap
	 *
	 */
	public void initGlobalMap() {
		if (CollectionUtils.isEmpty(branchMap)) {
			globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder()
					.put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO)
					.put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO)
					.put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO).build();
		}
	}

	/**
	 * init branchMap
	 *
	 */
	public void initBranchMap() {
		if (CollectionUtils.isEmpty(branchMap)) {
			branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder()
					.put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO)
					.put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO)
					.put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO).build();
		}
	}

	@Override
	public boolean writeSession(LogOperation logOperation, SessionStorable session) {
		if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) {
			return globalMap.containsKey(logOperation)
					? globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session))
					: branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session));
		}
		else {
			throw new StoreException("Unknown LogOperation:" + logOperation.name());
		}
	}

	/**
	 * Insert branch transaction
	 * @param branchTransactionDO
	 * @return the boolean
	 */
	private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
		String branchKey = buildBranchKey(branchTransactionDO.getBranchId());
		String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
			Date now = new Date();
			branchTransactionDO.setGmtCreate(now);
			branchTransactionDO.setGmtModified(now);
			pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO));
			pipelined.rpush(branchListKey, branchKey);
			pipelined.sync();
			return true;
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Delete the branch transaction
	 * @param branchTransactionDO
	 * @return
	 */
	private boolean deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
		String branchKey = buildBranchKey(branchTransactionDO.getBranchId());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			String xid = jedis.hget(branchKey, REDIS_KEY_BRANCH_XID);
			if (StringUtils.isEmpty(xid)) {
				return true;
			}
			String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());
			try (Pipeline pipelined = jedis.pipelined()) {
				pipelined.lrem(branchListKey, 0, branchKey);
				pipelined.del(branchKey);
				pipelined.sync();
			}
			return true;
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Update the branch transaction
	 * @param branchTransactionDO
	 * @return
	 */
	private boolean updateBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
		String branchKey = buildBranchKey(branchTransactionDO.getBranchId());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			String previousBranchStatus = jedis.hget(branchKey, REDIS_KEY_BRANCH_STATUS);
			if (StringUtils.isEmpty(previousBranchStatus)) {
				throw new StoreException("Branch transaction is not exist, update branch transaction failed.");
			}
			Map<String, String> map = new HashMap<>(3, 1);
			map.put(REDIS_KEY_BRANCH_STATUS, String.valueOf(branchTransactionDO.getStatus()));
			map.put(REDIS_KEY_BRANCH_GMT_MODIFIED, String.valueOf((new Date()).getTime()));
			if (StringUtils.isNotBlank(branchTransactionDO.getApplicationData())) {
				map.put(REDIS_KEY_BRANCH_APPLICATION_DATA, String.valueOf(branchTransactionDO.getApplicationData()));
			}
			jedis.hmset(branchKey, map);
			return true;
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Insert the global transaction.
	 * @param globalTransactionDO
	 * @return
	 */
	private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
		String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
			Date now = new Date();
			globalTransactionDO.setGmtCreate(now);
			globalTransactionDO.setGmtModified(now);
			pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
			String xid = globalTransactionDO.getXid();
			pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
			pipelined.zadd(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY,
					globalTransactionDO.getBeginTime() + globalTransactionDO.getTimeout(), globalKey);
			pipelined.sync();
			return true;
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Delete the global transaction. It will operate two parts: 1.delete the global
	 * session map 2.remove the xid from the global status list If the operate failed,the
	 * succeed operates will rollback
	 * @param globalTransactionDO
	 * @return
	 */
	private boolean deleteGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
		String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			String xid = jedis.hget(globalKey, REDIS_KEY_GLOBAL_XID);
			if (StringUtils.isEmpty(xid)) {
				LOGGER.warn("Global transaction is not exist,xid = {}.Maybe has been deleted by another tc server",
						globalTransactionDO.getXid());
				return true;
			}
			try (Pipeline pipelined = jedis.pipelined()) {
				pipelined.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, globalTransactionDO.getXid());
				pipelined.del(globalKey);
				if (GlobalStatus.Begin.getCode() == globalTransactionDO.getStatus()
						|| GlobalStatus.UnKnown.getCode() == globalTransactionDO.getStatus()) {
					pipelined.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
				}
				pipelined.sync();
			}
			return true;
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Update the global transaction. It will update two parts: 1.the global session map
	 * 2.the global status list If the update failed,the succeed operates will rollback
	 * @param globalTransactionDO
	 * @return
	 */
	private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
		String xid = globalTransactionDO.getXid();
		String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			// Defensive watch to prevent other TC server operating concurrently,Fail fast
			jedis.watch(globalKey);
			List<String> statusAndGmtModified = jedis.hmget(globalKey, REDIS_KEY_GLOBAL_STATUS,
					REDIS_KEY_GLOBAL_GMT_MODIFIED);
			String previousStatus = statusAndGmtModified.get(0);
			if (StringUtils.isEmpty(previousStatus)) {
				jedis.unwatch();
				throw new StoreException("Global transaction is not exist, update global transaction failed.");
			}
			if (previousStatus.equals(String.valueOf(globalTransactionDO.getStatus()))) {
				jedis.unwatch();
				return true;
			}
			GlobalStatus before = GlobalStatus.get(Integer.parseInt(previousStatus));
			GlobalStatus after = GlobalStatus.get(globalTransactionDO.getStatus());
			if (!SessionStatusValidator.validateUpdateStatus(before, after)) {
				throw new StoreException(
						"Illegal changing of global status, update global transaction failed." + " beforeStatus["
								+ before.name() + "] cannot be changed to afterStatus[" + after.name() + "]");
			}

			String previousGmtModified = statusAndGmtModified.get(1);
			Transaction multi = jedis.multi();
			Map<String, String> map = new HashMap<>(2);
			map.put(REDIS_KEY_GLOBAL_STATUS, String.valueOf(globalTransactionDO.getStatus()));
			map.put(REDIS_KEY_GLOBAL_GMT_MODIFIED, String.valueOf((new Date()).getTime()));
			multi.hmset(globalKey, map);
			multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid);
			multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
			multi.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
			List<Object> exec = multi.exec();
			if (CollectionUtils.isEmpty(exec)) {
				// The data has changed by another tc, so we still think the modification
				// is successful.
				LOGGER.warn(
						"The global transaction xid = {}, maybe changed by another TC. It does not affect the results",
						globalTransactionDO.getXid());
				return true;
			}
			String hmset = exec.get(0).toString();
			long lrem = (long) exec.get(1);
			long rpush = (long) exec.get(2);
			if (OK.equalsIgnoreCase(hmset) && lrem > 0 && rpush > 0) {
				return true;
			}
			else {
				// If someone failed, the succeed operations need rollback
				if (OK.equalsIgnoreCase(hmset)) {
					// Defensive watch to prevent other TC server operating
					// concurrently,give up this operate
					jedis.watch(globalKey);
					String xid2 = jedis.hget(globalKey, REDIS_KEY_GLOBAL_XID);
					if (StringUtils.isNotEmpty(xid2)) {
						Map<String, String> mapPrevious = new HashMap<>(2, 1);
						mapPrevious.put(REDIS_KEY_GLOBAL_STATUS, previousStatus);
						mapPrevious.put(REDIS_KEY_GLOBAL_GMT_MODIFIED, previousGmtModified);
						Transaction multi2 = jedis.multi();
						multi2.hmset(globalKey, mapPrevious);
						multi2.exec();
					}
				}
				if (lrem > 0) {
					jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)), xid);
				}
				if (rpush > 0) {
					jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, xid);
				}
				return false;
			}
		}
		catch (Exception ex) {
			throw new RedisException(ex);
		}
	}

	/**
	 * Read session global session.
	 * @param xid the xid
	 * @param withBranchSessions the withBranchSessions
	 * @return the global session
	 */
	@Override
	public GlobalSession readSession(String xid, boolean withBranchSessions) {
		String transactionId = String.valueOf(XID.getTransactionId(xid));
		String globalKey = buildGlobalKeyByTransactionId(transactionId);
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			Map<String, String> map = jedis.hgetAll(globalKey);
			if (CollectionUtils.isEmpty(map)) {
				return null;
			}
			GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
					GlobalTransactionDO.class);
			List<BranchTransactionDO> branchTransactionDOs = null;
			if (withBranchSessions) {
				branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
			}
			GlobalSession session = getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
			return session;
		}
	}

	/**
	 * Read session global session.
	 * @param xid the xid
	 * @return the global session
	 */
	@Override
	public GlobalSession readSession(String xid) {
		return this.readSession(xid, true);
	}

	/**
	 * Read globalSession list by global status
	 * @param statuses the statuses
	 * @return the list
	 */
	@Override
	public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {
		List<GlobalSession> globalSessions = Collections.synchronizedList(new ArrayList<>());
		List<String> statusKeys = convertStatusKeys(statuses);
		Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
		if (targetMap.size() == 0 || logQueryLimit <= 0) {
			return globalSessions;
		}
		int perStatusLimit = resetLogQueryLimit(targetMap);
		final long countGlobalSessions = targetMap.values().stream()
				.collect(Collectors.summarizingInt(Integer::intValue)).getSum();
		// queryCount
		final long queryCount = Math.min(logQueryLimit, countGlobalSessions);
		List<List<String>> list = new ArrayList<>();
		dogetXidsForTargetMapRecursive(targetMap, 0L, perStatusLimit - 1, queryCount, list);
		if (CollectionUtils.isNotEmpty(list)) {
			List<String> xids = list.stream().flatMap(Collection::stream).collect(Collectors.toList());
			xids.parallelStream().forEach(xid -> {
				GlobalSession globalSession = this.readSession(xid, withBranchSessions);
				if (globalSession != null) {
					globalSessions.add(globalSession);
				}
			});
		}
		return globalSessions;
	}

	@Override
	public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
		List<GlobalSession> list = Collections.emptyList();
		List<String> statusKeys = convertStatusKeys(GlobalStatus.Begin);
		Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
		if (targetMap.size() == 0 || logQueryLimit <= 0) {
			return list;
		}
		final long countGlobalSessions = targetMap.values().stream()
				.collect(Collectors.summarizingInt(Integer::intValue)).getSum();
		// queryCount
		final long queryCount = Math.min(logQueryLimit, countGlobalSessions);
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			Set<String> values = jedis.zrangeByScore(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, 0, System.currentTimeMillis(),
					0, (int) queryCount);
			List<Map<String, String>> rep;
			try (Pipeline pipeline = jedis.pipelined()) {
				for (String value : values) {
					pipeline.hgetAll(value);
				}
				rep = (List<Map<String, String>>) (List) pipeline.syncAndReturnAll();
			}
			list = rep.stream().map(map -> {
				GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
						GlobalTransactionDO.class);
				if (globalTransactionDO != null) {
					String xid = globalTransactionDO.getXid();
					List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
					if (withBranchSessions) {
						branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
					}
					return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
				}
				return null;
			}).filter(Objects::nonNull).collect(Collectors.toList());
		}
		return list;
	}

	/**
	 * get everyone keys limit
	 * @param targetMap
	 * @return
	 */
	private int resetLogQueryLimit(Map<String, Integer> targetMap) {
		int resetLimitQuery = logQueryLimit;
		if (targetMap.size() > 1) {
			int size = targetMap.size();
			resetLimitQuery = (logQueryLimit / size) == 0 ? 1 : (logQueryLimit / size);
		}
		return resetLimitQuery;
	}

	/**
	 * read the global session list by different condition
	 * @param sessionCondition the session condition
	 * @return the global sessions
	 */
	@Override
	public List<GlobalSession> readSession(SessionCondition sessionCondition) {
		List<GlobalSession> globalSessions = new ArrayList<>();
		if (StringUtils.isNotEmpty(sessionCondition.getXid())) {
			GlobalSession globalSession = this.readSession(sessionCondition.getXid(),
					!sessionCondition.isLazyLoadBranch());
			if (globalSession != null) {
				globalSessions.add(globalSession);
			}
			return globalSessions;
		}
		else if (sessionCondition.getTransactionId() != null) {
			GlobalSession globalSession = this.readSessionByTransactionId(
					sessionCondition.getTransactionId().toString(), !sessionCondition.isLazyLoadBranch());
			if (globalSession != null) {
				globalSessions.add(globalSession);
			}
			return globalSessions;
		}
		else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
			if (sessionCondition.getStatuses().length == 1 && sessionCondition.getStatuses()[0] == GlobalStatus.Begin) {
				return this.readSortByTimeoutBeginSessions(!sessionCondition.isLazyLoadBranch());
			}
			else {
				return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
			}
		}
		return null;
	}

	/**
	 * query GlobalSession by status with page
	 * @param param
	 * @return List<GlobalSession>
	 */
	public List<GlobalSession> readSessionStatusByPage(GlobalSessionParam param) {
		List<GlobalSession> globalSessions = new ArrayList<>();

		int pageNum = param.getPageNum();
		int pageSize = param.getPageSize();
		int start = Math.max((pageNum - 1) * pageSize, 0);
		int end = pageNum * pageSize - 1;

		if (param.getStatus() != null) {
			String statusKey = buildGlobalStatus(GlobalStatus.get(param.getStatus()).getCode());
			try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
				final List<String> xids = jedis.lrange(statusKey, start, end);
				xids.forEach(xid -> {
					GlobalSession globalSession = this.readSession(xid, param.isWithBranch());
					if (globalSession != null) {
						globalSessions.add(globalSession);
					}
				});
			}
		}
		return globalSessions;
	}

	/**
	 * assemble the global session and branch session
	 * @param globalTransactionDO the global transactionDo
	 * @param branchTransactionDOs the branch transactionDos
	 * @param withBranchSessions if read branch sessions
	 * @return the global session with branch session
	 */
	private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO,
			List<BranchTransactionDO> branchTransactionDOs, boolean withBranchSessions) {
		GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO, !withBranchSessions);
		if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
			for (BranchTransactionDO branchTransactionDO : branchTransactionDOs) {
				globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO));
			}
		}
		return globalSession;
	}

	/**
	 * read the global session by transactionId
	 * @param transactionId the transaction id
	 * @param withBranchSessions if read branch sessions
	 * @return the global session
	 */
	private GlobalSession readSessionByTransactionId(String transactionId, boolean withBranchSessions) {
		String globalKey = buildGlobalKeyByTransactionId(transactionId);
		String xid = null;
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			Map<String, String> map = jedis.hgetAll(globalKey);
			if (CollectionUtils.isEmpty(map)) {
				return null;
			}
			GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
					GlobalTransactionDO.class);
			if (globalTransactionDO != null) {
				xid = globalTransactionDO.getXid();
			}
			List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
			if (withBranchSessions) {
				branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
			}
			return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
		}
	}

	/**
	 * Read the branch session list by xid
	 * @param jedis the jedis
	 * @param xid the xid
	 * @return the branch transactionDo list
	 */
	private List<BranchTransactionDO> readBranchSessionByXid(Jedis jedis, String xid) {
		List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
		String branchListKey = buildBranchListKeyByXid(xid);
		List<String> branchKeys = lRange(jedis, branchListKey);
		if (CollectionUtils.isNotEmpty(branchKeys)) {
			try (Pipeline pipeline = jedis.pipelined()) {
				branchKeys.stream().forEach(branchKey -> pipeline.hgetAll(branchKey));
				List<Object> branchInfos = pipeline.syncAndReturnAll();
				for (Object branchInfo : branchInfos) {
					if (branchInfo != null) {
						Map<String, String> branchInfoMap = (Map<String, String>) branchInfo;
						Optional<BranchTransactionDO> branchTransactionDO = Optional.ofNullable(
								(BranchTransactionDO) BeanUtils.mapToObject(branchInfoMap, BranchTransactionDO.class));
						branchTransactionDO.ifPresent(branchTransactionDOs::add);
					}
				}
			}
		}
		if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
			Collections.sort(branchTransactionDOs);
		}
		return branchTransactionDOs;
	}

	private List<String> lRange(Jedis jedis, String key) {
		List<String> keys = new ArrayList<>();
		List<String> values;
		int limit = 20;
		int start = 0;
		int stop = limit;
		for (;;) {
			values = jedis.lrange(key, start, stop);
			keys.addAll(values);
			if (CollectionUtils.isEmpty(values) || values.size() < limit) {
				break;
			}
			start = keys.size();
			stop = start + limit;
		}
		return keys;
	}

	public List<BranchTransactionDO> findBranchSessionByXid(String xid) {
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			return readBranchSessionByXid(jedis, xid);
		}
	}

	/**
	 * query globalSession by page
	 * @param pageNum
	 * @param pageSize
	 * @param withBranchSessions
	 * @return List<GlobalSession>
	 */
	public List<GlobalSession> findGlobalSessionByPage(int pageNum, int pageSize, boolean withBranchSessions) {
		List<GlobalSession> globalSessions = new ArrayList<>();
		int start = Math.max((pageNum - 1) * pageSize, 0);
		int end = pageNum * pageSize - 1;

		List<String> statusKeys = convertStatusKeys(GlobalStatus.values());
		Map<String, Integer> stringLongMap = calculateStatuskeysHasData(statusKeys);

		List<List<String>> list = dogetXidsForTargetMap(stringLongMap, start, end, pageSize);

		if (CollectionUtils.isNotEmpty(list)) {
			List<String> xids = list.stream().flatMap(Collection::stream).collect(Collectors.toList());
			xids.forEach(xid -> {
				if (globalSessions.size() < pageSize) {
					GlobalSession globalSession = this.readSession(xid, withBranchSessions);
					if (globalSession != null) {
						globalSessions.add(globalSession);
					}
				}
			});
		}
		return globalSessions;
	}

	/**
	 * query and sort existing values
	 * @param statusKeys
	 * @return
	 */
	private Map<String, Integer> calculateStatuskeysHasData(List<String> statusKeys) {
		Map<String, Integer> resultMap = new LinkedHashMap<>();
		Map<String, Integer> keysMap = new HashMap<>(statusKeys.size());
		try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
			statusKeys.forEach(key -> pipelined.llen(key));
			List<Long> counts = (List) pipelined.syncAndReturnAll();
			for (int i = 0; i < counts.size(); i++) {
				if (counts.get(i) > 0) {
					keysMap.put(statusKeys.get(i), counts.get(i).intValue());
				}
			}
		}

		// sort
		List<Map.Entry<String, Integer>> list = new ArrayList<>(keysMap.entrySet());
		list.sort((o1, o2) -> o2.getValue() - o1.getValue());
		list.forEach(e -> resultMap.put(e.getKey(), e.getValue()));

		return resultMap;
	}

	/**
	 * count GlobalSession total by status
	 * @param values
	 * @return Long
	 */
	public Long countByGlobalSessions(GlobalStatus[] values) {
		List<String> statusKeys = new ArrayList<>();
		Long total = 0L;
		for (GlobalStatus status : values) {
			statusKeys.add(buildGlobalStatus(status.getCode()));
		}
		try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
			statusKeys.stream().forEach(statusKey -> pipelined.llen(statusKey));
			List<Long> list = (List<Long>) (List) pipelined.syncAndReturnAll();
			if (list.size() > 0) {
				total = list.stream().mapToLong(value -> value).sum();
			}
			return total;
		}
	}

	private List<String> convertStatusKeys(GlobalStatus... statuses) {
		List<String> statusKeys = new ArrayList<>();
		for (int i = 0; i < statuses.length; i++) {
			statusKeys.add(buildGlobalStatus(statuses[i].getCode()));
		}
		return statusKeys;
	}

	private void dogetXidsForTargetMapRecursive(Map<String, Integer> targetMap, long start, long end, long queryCount,
			List<List<String>> listList) {

		long total = listList.stream().mapToLong(List::size).sum();

		if (total >= queryCount) {
			return;
		}
		// when start greater than offset(totalCount)
		if (start >= queryCount) {
			return;
		}

		if (targetMap.size() == 0) {
			return;
		}

		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			Iterator<Map.Entry<String, Integer>> iterator = targetMap.entrySet().iterator();
			while (iterator.hasNext()) {
				String key = iterator.next().getKey();
				final long sum = listList.stream().mapToLong(List::size).sum();
				final long diffCount = queryCount - sum;
				if (diffCount <= 0) {
					return;
				}
				List<String> list;
				if (end - start >= diffCount) {
					long endNew = start + diffCount - 1;
					list = jedis.lrange(key, start, endNew);
				}
				else {
					list = jedis.lrange(key, start, end);
				}

				if (list.size() > 0) {
					listList.add(list);
				}
				else {
					iterator.remove();
				}
			}
		}
		long startNew = end + 1;
		long endNew = startNew + end - start;
		dogetXidsForTargetMapRecursive(targetMap, startNew, endNew, queryCount, listList);
	}

	private List<List<String>> dogetXidsForTargetMap(Map<String, Integer> targetMap, int start, int end,
			int totalCount) {
		List<List<String>> listList = new ArrayList<>();
		try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
			for (String key : targetMap.keySet()) {
				final List<String> list = jedis.lrange(key, start, end);
				final long sum = listList.stream().mapToLong(List::size).sum();
				if (list.size() > 0 && sum < totalCount) {
					listList.add(list);
				}
				else {
					start = 0;
					end = totalCount - 1;
				}
			}
		}
		return listList;
	}

	private String buildBranchListKeyByXid(String xid) {
		return REDIS_SEATA_BRANCHES_PREFIX + xid;
	}

	private String buildGlobalKeyByTransactionId(Object transactionId) {
		return REDIS_SEATA_GLOBAL_PREFIX + transactionId;
	}

	private String buildBranchKey(Long branchId) {
		return REDIS_SEATA_BRANCH_PREFIX + branchId;
	}

	private String buildGlobalStatus(Integer status) {
		return REDIS_SEATA_STATUS_PREFIX + status;
	}

	/**
	 * Sets log query limit.
	 * @param logQueryLimit the log query limit
	 */
	public void setLogQueryLimit(int logQueryLimit) {
		this.logQueryLimit = logQueryLimit;
	}

}
